# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import abstractmethod
import sympy as sm
from hysop.constants import BoundaryCondition
from hysop.tools.numpywrappers import npw
from hysop.tools.htypes import check_instance, first_not_None
from hysop.tools.sympy_utils import get_derivative_variables
from hysop.tools.numerics import find_common_dtype
from hysop.fields.continuous_field import Field, TensorField
from hysop.fields.discrete_field import DiscreteField, DiscreteTensorField
from hysop.symbolic import Symbol
from hysop.symbolic.base import TensorBase, SymbolicTensor
from hysop.symbolic.func import (
UndefinedFunction,
AppliedSymbolicFunction,
FunctionBase,
SymbolicFunctionTensor,
)
from hysop.domain.domain import Domain
[docs]
class FieldExpressionI:
def __new__(cls, *args, **kwds):
return super().__new__(cls, *args, **kwds)
def __init__(self, *args, **kwds):
super().__init__(**kwds)
[docs]
@abstractmethod
def lboundaries(self):
pass
[docs]
@abstractmethod
def rboundaries(self):
pass
[docs]
@abstractmethod
def domain(self):
pass
[docs]
@abstractmethod
def dtype(self):
pass
@property
def boundaries(self):
return (self.lboundaries, self.rboundaries)
[docs]
class FieldExpression(FieldExpressionI):
def __new__(cls, *args, **kwds):
domain = kwds.pop("domain", None)
dtype = kwds.pop("dtype", None)
lboundaries = kwds.pop("lboundaries", None)
rboundaries = kwds.pop("rboundaries", None)
obj = super().__new__(cls, *args, **kwds)
obj._domain = domain
obj._dtype = dtype
obj._lboundaries = lboundaries
obj._rboundaries = rboundaries
return obj
def __init__(self, *args, **kwds):
super().__init__(*args, **kwds)
@property
def lboundaries(self):
assert self._lboundaries is not None
return self._lboundaries
@lboundaries.setter
def lboundaries(self, lb):
check_instance(
lb, npw.ndarray, values=BoundaryCondition, size=self.domain.dim, ndim=1
)
self._lboundaries = lb
@property
def rboundaries(self):
assert self._rboundaries is not None
return self._rboundaries
@rboundaries.setter
def rboundaries(self, rb):
check_instance(
rb, npw.ndarray, values=BoundaryCondition, size=self.domain.dim, ndim=1
)
self._rboundaries = rb
@property
def domain(self):
assert self._domain is not None
return self._domain
@domain.setter
def domain(self, dom):
assert self._domain is None
check_instance(dom, Domain)
self._domain = dom
@property
def dtype(self):
assert self._dtype is not None
return self._dtype
@dtype.setter
def dtype(self, dt):
assert self._dtype is None
check_instance(dt, npw.dtype)
self._dtype = dt
[docs]
class DerivativeFieldExpr(FieldExpression, sm.Derivative):
pass
[docs]
class FieldExpressionBuilder:
[docs]
class BoundaryIncompatibilityError(ValueError):
pass
[docs]
class InvalidExpression(ValueError):
pass
[docs]
@classmethod
def is_field_expr(cls, expr):
return isinstance(expr, FieldExpressionI)
[docs]
@classmethod
def update_boundaries(cls, boundary, order):
from hysop.constants import BoundaryCondition
if (order % 2) == 0:
return boundary
elif boundary is BoundaryCondition.PERIODIC:
return BoundaryCondition.PERIODIC
elif boundary is BoundaryCondition.HOMOGENEOUS_DIRICHLET:
return BoundaryCondition.HOMOGENEOUS_NEUMANN
elif boundary is BoundaryCondition.HOMOGENEOUS_NEUMANN:
return BoundaryCondition.HOMOGENEOUS_DIRICHLET
else:
msg = "FATAL ERROR: Unknown boundary condition {}."
msg = msg.format(bd)
raise NotImplementedError(msg)
[docs]
@classmethod
def to_field_expression(cls, expr, space_symbols, strict=True):
check_instance(expr, sm.Expr)
def _to_field_expression_impl(expr):
if cls.is_field_expr(expr):
return expr
elif isinstance(expr, sm.Derivative):
e = _to_field_expression_impl(expr.args[0])
if cls.is_field_expr(e):
dtype, domain = e.dtype, e.domain
lb, rb = (
e.lboundaries.copy(),
e.rboundaries.copy(),
)
assert len(space_symbols) == lb.size == rb.size
for xi in get_derivative_variables(expr):
assert xi in space_symbols, xi
i = space_symbols[::-1].index(xi)
lb[i] = cls.update_boundaries(lb[i], +1)
rb[i] = cls.update_boundaries(rb[i], +1)
expr = DerivativeFieldExpr(e, *expr.args[1:])
expr.domain = domain
expr.dtype = dtype
expr.lboundaries = lb
expr.rboundaries = rb
return expr
else:
return expr
else:
func = expr.func
args = tuple(_to_field_expression_impl(a) for a in expr.args)
field_expression_args = tuple(
filter(lambda x: cls.is_field_expr(x), args)
)
if field_expression_args:
try:
return cls.make_expr(func, *args)
except cls.BoundaryIncompatibilityError:
msg = f"\nError during the handling of expression {expr}."
msg += "\nSome boundaries were not compatible:"
msg += "\n *" + "\n *".join(
f"{a}: {a.format_boundaries()}"
for a in field_expression_args
)
raise cls.BoundaryIncompatibilityError(msg)
else:
return expr
fexpr = _to_field_expression_impl(expr)
if strict and (not cls.is_field_expr(fexpr)):
msg = f"\nError during the handling of expression {expr}."
msg += "\nCould not determine boundaries because no FieldExpression "
msg += "was present in expression."
raise cls.InvalidExpression(msg)
return fexpr
[docs]
@classmethod
def make_expr(cls, func, *args):
check_instance(func, type)
field_expression_args = tuple(filter(lambda x: cls.is_field_expr(x), args))
if not field_expression_args:
msg = "No FieldExpression arguments present in args."
raise ValueError(msg)
if not cls.check_boundary_compatibility(*field_expression_args):
raise cls.BoundaryIncompatibilityError
fea0 = field_expression_args[0]
new_func = type(func.__name__ + "FieldExpr", (FieldExpression, func), {})
new_expr = new_func(*args)
new_expr.dtype = npw.dtype(
find_common_dtype(*tuple(a.dtype for a in field_expression_args))
)
new_expr.domain = fea0.domain
new_expr.lboundaries = fea0.lboundaries.copy()
new_expr.rboundaries = fea0.rboundaries.copy()
return new_expr
[docs]
@classmethod
def check_boundary_compatibility(cls, arg0, *args):
check_instance(args, tuple, values=FieldExpressionI)
domain, lb, rb = arg0.domain, arg0.lboundaries, arg0.rboundaries
if args:
match = all((domain == a.domain) for a in args)
match &= all(all(lb == a.lboundaries) for a in args)
match &= all(all(rb == a.rboundaries) for a in args)
return match
else:
return True
[docs]
class FieldBase(FunctionBase):
def _sympy_(self):
"""for sympify"""
return self
def __new__(cls, field, idx=None, **kwds):
assert "name" not in kwds
assert "pretty_name" not in kwds
assert "latex_name" not in kwds
assert "var_name" not in kwds
check_instance(field, (Field, DiscreteField))
assert (field.nb_components == 1) or (idx is not None), (
field.nb_components,
idx,
)
index = first_not_None(idx, [0])[0]
name = field.name
pretty_name = field.pretty_name
var_name = field.var_name
latex_name = field.latex_name
assert 0 <= index < field.nb_components, index
try:
obj = super().__new__(
cls,
name=name,
pretty_name=pretty_name,
var_name=var_name,
latex_name=latex_name,
**kwds,
)
except TypeError:
obj = super().__new__(cls, name=name, **kwds)
obj._field = field
obj._index = index
return obj
def __init__(self, field, idx=None, **kwds):
try:
super().__init__(
name=None, pretty_name=None, var_name=None, latex_name=None, **kwds
)
except TypeError:
super().__init__(name=None, **kwds)
def _hashable_content(self):
"""See sympy.core.basic.Basic._hashable_content()"""
hc = super()._hashable_content()
hc += (
self._field,
self._index,
)
return hc
@property
def field(self):
"""Get associated field."""
return self._field
@property
def index(self):
"""Get component index of the target field."""
return self._index
@property
def indexed_field(self):
"""Get a unique identifier for an indexed field component."""
return (self._field, self._index)
def __getitem__(self, key):
assert key == 0
return self
[docs]
class SymbolicDiscreteField(FieldBase, Symbol):
"""
Symbolic discrete field symbol.
"""
def __new__(cls, field, name=None, fn=None, **kwds):
check_instance(field, DiscreteField)
return super().__new__(cls, field=field, fn=fn, **kwds)
def __init__(self, field, name=None, fn=None, **kwds):
super().__init__(field=field, fn=fn, **kwds)
[docs]
@classmethod
def from_field(cls, field):
if field.nb_components == 1:
return cls(field=field)
else:
return SymbolicDiscreteFieldTensor(field=field)
[docs]
class SymbolicField(FieldBase, UndefinedFunction):
"""
Symbolic unapplied scalar field as an undefined function of some frame coordinates and time.
This is a metaclass.
"""
def __new__(cls, field, fn=None, bases=None, **kwds):
bases = first_not_None(bases, (AppliedSymbolicField,))
check_instance(field, Field)
return super().__new__(cls, bases=bases, field=field, fn=fn, **kwds)
def __init__(self, field, fn=None, bases=None, **kwds):
super().__init__(bases=bases, field=field, fn=fn, **kwds)
[docs]
def __hash__(self):
"Fix sympy v1.2 hashes"
h = super().__hash__()
for hc in (self.field, self.index):
h ^= hash(h)
return h
[docs]
def __eq__(self, other):
"Fix sympy v1.2 eq"
eq = super().__eq__(other)
if eq is not True:
return eq
for lhc, rhc in zip((self.field, self.index), (other.field, other.index)):
eq &= lhc == rhc
return eq
[docs]
def __ne__(self, other):
"Fix sympy v1.2 neq"
return not (self == other)
[docs]
class AppliedSymbolicField(FieldExpressionI, AppliedSymbolicFunction):
"""Applied scalar fields, hold a reference to a continuous field."""
def __new__(cls, *args, **kwds):
args = args if args else cls.field.domain.frame.vars
return super().__new__(cls, *args, **kwds)
def __init__(self, *args, **kwds):
super().__init__(*args, **kwds)
def _sympy_(self):
"""for sympify"""
return self
def _hashable_content(self):
"""See sympy.core.basic.Basic._hashable_content()"""
hc = super()._hashable_content()
hc += (
self.field,
self.index,
)
return hc
@property
def field(self):
return type(self).field
@property
def index(self):
"""Get component index of the target field."""
return type(self).index
@property
def indexed_field(self):
"""Get a unique identifier for an indexed field component."""
return (self.field, self.index)
@property
def lboundaries(self):
return self.field.lboundaries
@property
def rboundaries(self):
return self.field.rboundaries
@property
def domain(self):
return self.field.domain
@property
def dtype(self):
return self.field.dtype
[docs]
class SymbolicFieldTensor(SymbolicFunctionTensor):
"""Symbolic tensor symbol."""
def __new__(cls, field, **kwds):
check_instance(field, TensorField)
shape = field.shape
init = npw.empty(shape=shape, dtype=object)
for idx, field in field.nd_iter():
init[idx] = field.symbol
return super().__new__(cls, shape=shape, init=init)
def __init__(self, field, **kwds):
super().__init__(shape=None, init=None)
[docs]
class SymbolicDiscreteFieldTensor(TensorBase):
"""Symbolic tensor symbol."""
def __new__(cls, dfield, name=None, **kwds):
from hysop.fields.discrete_field import DiscreteTensorField
check_instance(dfield, DiscreteTensorField)
shape = dfield.shape
init = npw.empty(shape=shape, dtype=object)
for idx, df in dfield.nd_iter():
init[idx] = df.symbol
return super().__new__(cls, shape=shape, init=init, **kwds)
def __init__(self, dfield, name=None, **kwds):
super().__init__(shape=None, init=None, **kwds)
[docs]
def diff(F, *symbols, **assumptions):
is_tensor = isinstance(F, npw.ndarray)
if is_tensor:
return F.view(TensorBase).diff(*symbols, **assumptions)
else:
return sm.diff(F, *symbols, **assumptions)
[docs]
def grad(F, frame, axis=-1):
is_tensor = isinstance(F, npw.ndarray)
if is_tensor:
shape = F.shape
ndim = max(F.ndim, 1)
axis = (axis + ndim) % ndim
new_shape = shape[: axis + 1] + (frame.dim,) + shape[axis + 1 :]
else:
assert isinstance(F, sm.Expr)
assert (axis == -1) or (axis == 0)
shape = (1,)
new_shape = (frame.dim,)
gradF = npw.ndarray(shape=new_shape, dtype=object)
for idx in npw.ndindex(*shape):
for i, xp in enumerate(frame.coords):
if is_tensor:
new_idx = idx[: axis + 1] + (i,) + idx[axis + 1 :]
gradF[new_idx] = diff(F[idx], xp)
else:
gradF[i] = diff(F, xp)
return gradF.view(TensorBase)
[docs]
def div(F, frame, axis=-1):
if isinstance(F, npw.ndarray):
assert F.shape[axis] == frame.dim
shape = F.shape
ndim = F.ndim
axis = (axis + ndim) % ndim
divF = npw.empty_like(F)
for idx in npw.ndindex(*shape):
divF[idx] = diff(F[idx], frame.coords[idx[axis]])
divF = divF.sum(axis=axis)
try:
if divF.size == 1:
return divF.item()
else:
return divF.view(TensorBase)
except AttributeError:
return divF
else:
assert frame.dim == 1
return F.diff(frame.coords[0])
[docs]
def rot(F, frame):
F = npw.atleast_1d(F)
assert F.ndim == 1, F.ndim
assert frame.dim in (2, 3)
X = frame.coords
if frame.dim == 2:
if F.size == 1:
rotF = npw.asarray(
[
+diff(F[0], X[1]),
-diff(F[0], X[0]),
]
)
return rotF.view(TensorBase)
elif F.size == 2:
return diff(F[1], X[0]) - diff(F[0], X[1])
else:
raise ValueError(F.size)
elif frame.dim == 3:
if F.size == 3:
rotF = npw.empty_like(F)
rotF[0] = diff(F[2], X[1]) - diff(F[1], X[2])
rotF[1] = diff(F[0], X[2]) - diff(F[2], X[0])
rotF[2] = diff(F[1], X[0]) - diff(F[0], X[1])
return rotF.view(TensorBase)
else:
raise ValueError(F.size)
else:
raise ValueError(frame.dim)
[docs]
def curl(*args, **kwds):
return rot(*args, **kwds)
[docs]
def laplacian(F, frame):
return div(grad(F, frame), frame)
[docs]
def convective_derivative(U, F, frame):
return diff(F, frame.time) + grad(F, frame).dot(U)
if __name__ == "__main__":
from hysop import Field, Box
from hysop.tools.contexts import printoptions
from hysop.tools.sympy_utils import enable_pretty_printing
enable_pretty_printing()
dim = 3
box = Box(length=(1,) * dim)
frame = box.frame
S0 = Field(domain=box, name="S0", nb_components=1, dtype=npw.float32)
S1 = Field(domain=box, name="S1", nb_components=1, dtype=npw.float64)
U = Field(domain=box, name="U", is_vector=True, dtype=npw.uint8)
V = Field(domain=box, name="V", nb_components=2, dtype=npw.int32)
s0 = S0.s
s1 = S1.s
u = U.s
v = V.s
assert s0.field is S0
assert s1.field is S1
assert u[0].field is U
assert v[-1].field is V
with printoptions(linewidth=1000):
print(s0, s0.index, s0.field.short_description())
print()
print(s1, s1.index, s1.field.short_description())
print()
print(u[0], u[0].index, u[0].field.short_description())
print(u[1], u[1].index, u[1].field.short_description())
print(u[2], u[2].index, u[2].field.short_description())
print()
print(v)
print(v[0], v[0].index, v[0].field.short_description())
print(v[1], v[1].index, v[1].field.short_description())
print()
print(s0(*frame.vars))
print(s1(*frame.vars))
print(u(*frame.vars))
print(v(*frame.vars))
print()
print(grad(s0(*frame.vars), frame))
print()
print(grad(s0(frame.coords[0]), frame))
print()
print(grad(s1(frame.coords[-1]), frame))
print()
print(grad(v(*frame.vars), frame))
print()
print(grad(u(*frame.vars), frame))
print()
print(grad(u(frame.coords[1]), frame))
print()
print(div(u(*frame.vars), frame))
print()
print(div(grad(s0(*frame.vars), frame), frame))
print()
print(div(grad(v(*frame.vars), frame), frame))
print()
print(laplacian(s0(*frame.vars), frame))
print()
print(rot(u(*frame.vars), frame))
print()
print(convective_derivative(u(*frame.vars), u(*frame.vars), frame))
print()
print(npw.eye(8, dtype=npw.uint8).view(TensorBase).latex())
print()
print(convective_derivative(u(*frame.vars), u(*frame.vars), frame).latex())
print()
D = SymbolicTensor("D", shape=(3, 3))
print(D)
print(div(D.dot(grad(s0(*frame.vars), frame)), frame))
print()
print(div(D.dot(grad(u(*frame.vars), frame)), frame))
print()
print(D.dot(grad(s0(*frame.vars), frame)))
print()
print(grad(div(u(*frame.vars), frame), frame) - div(grad(u(), frame), frame))